/*
* @Author: 窦子滨<zibin_5257@163.com>
* @Date: 2019-04-10
 */
package TcpPack

import (
	"bufio"
	"bytes"
	"encoding/binary"
	"encoding/hex"
	"io"
	"net"
	"strings"
	"time"
)

//在单独的线程中处理充电桩长连接
func HandleConn(Conn net.Conn) {
	conn := WriteConn(Conn)

	defer Conn.Close()
	//接收数据线程
	go conn.ReadSocket()
	//发送数据线程
	go conn.WriteSocket()
	//进行心跳的检测
	go func(conn *HpCharge) {
		ForEnd:
		for {
			select {
			case <-time.After(time.Duration(35) * time.Second):
				Log.Error("[%s][%d][失败：心跳0x01检测超时][未在30秒上传心跳报文]",conn.ConnId,conn.DeviceId)
			case <-conn.chanHealth:
				Log.Info("[%s][%d][通过：30秒收到心跳0x01报文][]",conn.ConnId,conn.DeviceId)
			case <-conn.ctx.Done():
				Log.Info("[%s][%d][心跳0x01检测终止][]",conn.ConnId,conn.DeviceId)
				break ForEnd
			}
		}
	}(conn)

	//登录指令检测 连接建立成功 需要在30s内发起第一次登录
	go func(conn *HpCharge) {
		select {
		case <-time.After(time.Duration(30) * time.Second):
			Log.Error("[%s][%d][失败：登录0x02检测超时][未在连接成功后20秒内发起登录报文]",conn.ConnId,conn.DeviceId)
		case <-conn.chanLogin:
			Log.Info("[%s][%d][通过：登录0x02][在规定时间内首次上报登录报文]",conn.ConnId,conn.DeviceId)
		case <-conn.ctx.Done():
			Log.Info("[%s][%d][登录0x02检测终止][]",conn.ConnId,conn.DeviceId)
		}
	}(conn)

	//主动对时线程
	go conn.serverTime()
	//执行测试线程
	go TestHandle(conn)

ForEnd:
	for {
		select {
		//case readStr := <-conn.ReadChan:
		//conn.WriteChan <- readStr
		case <-conn.ctx.Done():
			break ForEnd
		}
	}

	//删除连接对象
	conn.DeleteConn()

	Log.Info("[%s][%d][充电桩连接关闭][]", conn.ConnId, conn.DeviceId)
}

//读取线程
func (conn *HpCharge) ReadSocket() {
	for {
		data := make([]byte, 1024)
		len1, err := conn.Conn.Read(data)
		if err != nil {
			//是否结束
			if err == io.EOF {
				Log.Notice("[%s][%d][Socket读取出错,关闭][%s]", conn.ConnId, conn.DeviceId, err.Error())
				break
			}
			//是否为网络关闭
			if strings.Contains(err.Error(), "use of closed network connection") {
				Log.Notice("[%s][%d][Socket读取出错,网络关闭][%s]", conn.ConnId, conn.DeviceId, err.Error())
				break
			}

			//读取超时  视为异常断开
			if strings.Contains(err.Error(), "i/o timeout") {
				Log.Notice("[%s][%d][Socket读取出错,读取超时关闭][%s]", conn.ConnId, conn.DeviceId, err.Error())
				break
			}

			Log.Notice("[%s][%d][Socket读取出错，其他错误并关闭连接][%s]", conn.ConnId, conn.DeviceId, err.Error())
			break
		}
		conn.Conn.SetDeadline(time.Now().Add(90 * time.Second))

		//fmt.Println("接收数据长度：", len1)
		result := make([]byte, len1)
		result = data[:len1]

		reader := bytes.NewReader(result)

		result = nil

		//用于处理粘包
		scanner := bufio.NewScanner(reader)
		scanner.Split(func(data []byte, atEOF bool) (advance int, token []byte, err error) {
			if !atEOF && data[0] == 117 && data[1] == 114 {
				if len(data) > 4 { // 如果收到的数据>4个字节(2字节版本号+2字节数据包长度)
					length := int16(0)
					binary.Read(bytes.NewReader(data[2:4]), binary.LittleEndian, &length) // 读取数据包第3-4字节(int16)=>数据部分长度
					//fmt.Println(length)
					if int(length)+2 <= len(data) { // 如果读取到的数据正文长度+2字节版本号长度不超过读到的数据(实际上就是成功完整的解析出了一个包)
						return int(length) + 2, data[:int(length)+2], nil
					}
				}
			}
			return
		})

		for scanner.Scan() {
			go conn.HandleInstruct(scanner.Bytes())
			//fmt.Println(scanner.Bytes())
		}

		if scanner.Err() != nil {
			Log.Error("[%s][%d][数据流拆分出错][%s]", conn.ConnId, conn.DeviceId, scanner.Err().Error())
		}
		//conn.ReadChan <- result

		data = make([]byte, 1024)
	}
	Log.Error("[%s][%d][读线程退出][]", conn.ConnId, conn.DeviceId)
	conn.Cancel()
}

//写入线程
func (conn *HpCharge) WriteSocket() {
ForEnd:
	for {
		select {
		case strData := <-conn.WriteChan:
			_, err := conn.Conn.Write(strData)
			if err != nil {
				Log.Emergency("[%s][%d][tcp写入数据失败][%s]", conn.ConnId, conn.DeviceId, err.Error()+"："+hex.EncodeToString(strData))
				continue
			}
			conn.Conn.SetDeadline(time.Now().Add(90 * time.Second))
			Log.Info("[%s][%d][tcp发送数据][%s]", conn.ConnId, conn.DeviceId, hex.EncodeToString(strData))
		case <-conn.ctx.Done():
			break ForEnd
		}
	}
	Log.Info("[%s][%d][写入线程退出][]", conn.ConnId, conn.DeviceId)
}

//服务器每隔60分钟主动下发对时指令
func (conn *HpCharge) serverTime() {
ForEnd:
	for {
		select {
		//测试环境每间隔5s  发送一次对时请求
		case <-time.After(time.Duration(60) * time.Minute):
			conn.sendServerTime()
		case <-conn.ctx.Done():
			break ForEnd
		}
	}
	Log.Info("[%s][%d][对时线程退出][]", conn.ConnId, conn.DeviceId)
}

//服务端主动下发对时数据
func (conn *HpCharge) sendServerTime() {
	//对时数据
	login := &TimePack{
		Time: time.Now().Format("2006-01-02 15:04:05"),
	}

	Log.Info("[%s][%d][服务端下发对时数据][%+v]", conn.ConnId, conn.DeviceId, login)

	buf := new(bytes.Buffer)
	login.Pack(buf)

	f := PackAll(conn.DeviceId, "08", "0", "0", 0, buf.Bytes())

	conn.WriteData(f)

	go func(conn *HpCharge) {
		select {
		case <-time.After(time.Duration(5) * time.Second):
			Log.Error("[][][下发对时请求充电桩未在规定时间内回复][]")
		case <-conn.chanTime:
			Log.Info("[][][通过：成功收到对时回复][]")
		}
	}(conn)
}

//指令处理 根据不同的指令进行不同的处理逻辑
func (conn *HpCharge) HandleInstruct(result []byte) {

	p := &Pack{
		Data: result,
	}

	base := new(PackBase)
	base.Unpack(bytes.NewReader(result), len(result))

	Log.Debug("[%s][%d-%s][接收到的十六进制数据][%s]", conn.ConnId, conn.DeviceId, base.GunId, hex.EncodeToString(result))
	Log.Debug("[%s][%d-%s][接收到的基础数据解析结果][%+v]", conn.ConnId, base.DeviceId, base.GunId, base)
	Log.Debug("[%s][%d][开始指令处理][%s]", conn.ConnId, conn.DeviceId, base.Direct)

	if p.CheckSum(base.SignSum) != nil {
		Log.Notice("[%s][%d][接收数据校验和失败][报文抛弃]", conn.ConnId, base.DeviceId)
		return
	}

	if p.CheckLenght(base.Length) != nil {
		Log.Notice("[%s][%d][接收数据长度检测失败][报文抛弃]", conn.ConnId, base.DeviceId)
		return
	}

	//不是登陆和检测不通过
	if base.Direct != "02" && !CheckDeviceIdConn(base.DeviceId) {
		Log.Error("[%s][%d][非法登录检测][终止]", conn.ConnId, base.DeviceId)
		conn.Cancel()
	}

	//不同的指令处理
	switch base.Direct {
	case "01": //心跳
		base.health(conn)
	case "02": //登录
		base.login(conn)
	case "03":
		Log.Info("[%s][%d][充电桩主动退出][]", conn.ConnId, base.DeviceId)
		conn.Cancel()
	case "05": //服务端下发写入数据 充电桩回复
		base.writeDataReply(conn)
	case "06": //充电桩主动上传历史记录 在该方法中处理结账流程
		base.historyWrite(conn)
	case "07": //充电桩主动上传告警数据
		base.warnWrite(conn)
	case "08": //对时回复
		base.timeRespond(conn)
	case "09": //主站采集实时数据 充电桩回复
		base.readRealTimeReply(conn)
	case "10": //充电桩启停回复
		base.startStopReply(conn)
	case "14": //充电桩主动上传实时数据处理
		base.realTime(conn)
	case "30": //充电桩主动刷卡请求
		//base.cardRequest(conn) //以前的方法处理
	case "41": //充电桩获取浮点费率
		Log.Info("[%s][%d][充电桩主动获取浮点费率][]：", conn.ConnId, conn.DeviceId)
		base.getRate(conn, "41")
	case "45": //充电桩上报充电命令
		base.reportCharge(conn)
	}
}

//充电桩连接最后存活时间检测 超过半小时未活跃 自动断开并删除连接池 防止失效连接池内存泄露 每小时重复检测一次
func ChargeConnCheck() {
begin:
	//1小时之后执行
	time.Sleep(time.Duration(60) * time.Minute)
	for k, v := range ChargeConn {
		//如果发现超过半小时未心跳 则删除连接池中的连接
		if time.Now().Unix()-v.HealthLastTime > 1800 {
			//取消线程守护
			v.Cancel()
			ChargeConnRW.Lock()
			delete(ChargeConn, k)
			ChargeConnRW.Unlock()
		}
	}
	goto begin
}
